package com.icbc.dccsh.logprocessor;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.kafka.BrokerHosts;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.icbc.dccsh.bolt.HashBolt;
import com.icbc.dccsh.bolt.JsonBolt;
import com.icbc.dccsh.bolt.NeedHistoryBolt;
import com.icbc.dccsh.bolt.CounterBolt;
import com.icbc.dccsh.bolt.FlatFileBolt;
import com.icbc.dccsh.bolt.FlattenBolt;
import com.icbc.dccsh.bolt.FormulaCalcBolt;
import com.icbc.dccsh.bolt.GerFormulaMapper;
import com.icbc.dccsh.bolt.GetHistoryMapper;
import com.icbc.dccsh.bolt.RedisBoltBuilder;
import com.icbc.dccsh.spout.GeneratorSpout;
//import com.icbc.dccsh.spout.KafkaSpout;
import com.icbc.dccsh.spout.KafkaSpout;
import com.icbc.dccsh.storm.es.bolt.EsBolt;
import com.icbc.dccsh.storm.mongodb.bolt.MongoInsertBolt;
import com.icbc.dccsh.storm.mongodb.mapper.JsonMongoMapper;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class LogProcessTopology {
	public static final Logger LOG = LoggerFactory.getLogger(LogProcessTopology.class);
	public static XMLConfiguration xmlconfig;

	private final BrokerHosts brokerHosts;

	static {
		try {
			xmlconfig = new XMLConfiguration("conf.xml");
		} catch (ConfigurationException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public LogProcessTopology(String kafkaZookeeper) {
		brokerHosts = new ZkHosts(kafkaZookeeper);
	}

	public StormTopology buildTopology() {
//		String queue = xmlconfig.getString("kafka.queue");
//		String zkroot = xmlconfig.getString("kafka.zkroot", "");
//		String id = xmlconfig.getString("kafka.id", "storm");
//		
//		SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, queue, zkroot, id);
//		kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//		kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
		TopologyBuilder builder = new TopologyBuilder();
		// ==> kafka configuration
		String kafkaBootStrapServer = xmlconfig.getString("kafka.bootstrapservers", "localhost:9092");
		String kafkaGroupId = xmlconfig.getString("kafka.groupid","defaultGroup");
		String kafkaQueue = xmlconfig.getString("kafka.queue", "logpumper8");
		Map<Object, Object> kafkaConf = new HashMap<Object, Object>();
		kafkaConf.put("kafka.bootstrap.server", kafkaBootStrapServer);
		kafkaConf.put("kafka.group.id", kafkaGroupId);
		kafkaConf.put("kafka.queue", kafkaQueue);
		// <==
		
		
		// ==> elasticsearch configuration
		int batchSize = xmlconfig.getInt("es.batchsize", 1);
		String esHost = xmlconfig.getString("es.host", "localhost:9200");
		String indexName = xmlconfig.getString("es.indexname", "storm/doc");
		int esFlushInterval = xmlconfig.getInt("es.flushinterval", 1);
		Map<Object, Object> esConf = new HashMap<Object, Object>();
		esConf.put("es.nodes", esHost);
		esConf.put("es.input.json", "true");
		esConf.put("es.index.auto.create", "true");
		esConf.put("es.storm.bolt.write.ack", "true");
		esConf.put("es.batch.size.entries", batchSize);
		esConf.put("es.storm.bolt.flush.entries.size", batchSize);
//		esConf.put("es.flush.interval", esFlushInterval);
		// <==
		
		// ==> mongodb configuration
		Map<Object, Object> mongoConf = new HashMap<Object, Object>();
		String mongoConnectionString = xmlconfig.getString("mongo.connstr", "mongodb://localhost:27017");
		String mongoDbName = xmlconfig.getString("mongo.db", "testdb");
		String mongoCollectinoName = xmlconfig.getString("mongo.collection", "testcollection");
		String mongoUrl = String.format("%s/%s", mongoConnectionString, mongoDbName);
		
		int mongoBatchsize = xmlconfig.getInt("mongo.batchsize", 100);
		int mongoFlushInterval = xmlconfig.getInt("mongo.flushinterval", 10);
		boolean mongoOrdered = xmlconfig.getBoolean("mongo.ordered", false);
		mongoConf.put("mongo.url", mongoUrl);
		mongoConf.put("mongo.batchsize", mongoBatchsize);
		mongoConf.put("mongo.flushinterval", mongoFlushInterval);
		mongoConf.put("mongo.ordered", mongoOrdered);
		// <==
		
		// ==> 
		boolean debugMode = xmlconfig.getBoolean("storm.bolt.debugmode", false);
//		boolean ignoreException = xmlconfig.getBoolean("storm.formulaCalcBolt.ignoreException", false);
		// <==
		builder.setSpout("generator", new GeneratorSpout(4000), 1);
//		builder.setSpout("generator", new storm.kafka.KafkaSpout(kafkaConfig), 1);
//		builder.setSpout("generator", new KafkaSpout(kafkaConf), 1);
		// builder.setBolt("hash", new HashBolt(), 2).fieldsGrouping("kafka",
		// new Fields("str"));
		builder.setBolt("hash", new HashBolt(), 1).shuffleGrouping("generator");
		// builder.setBolt("formula", new CounterBolt("formula"),
		// 1).shuffleGrouping("hash");
		builder.setBolt("formula", RedisBoltBuilder.build(new GerFormulaMapper()), 1).shuffleGrouping("hash");
		// builder.setBolt("counter", new CounterBolt("counter"),
		// 1).shuffleGrouping("formula");
		// builder.setBolt("formula", new RedisBolt(new GerFormulaMapper()),
		// 1).shuffleGrouping("hash");
		builder.setBolt("needHistory", new NeedHistoryBolt(), 2).setNumTasks(4).shuffleGrouping("formula");

		builder.setBolt("history", RedisBoltBuilder.build(new GetHistoryMapper()), 1).shuffleGrouping("needHistory",
				"get_history_stream");
		// builder.setBolt("history", new RedisBolt(new GetHistoryMapper()),
		// 1).shuffleGrouping("needHistory", "get_history_stream");
		//
		builder.setBolt("calc", new FormulaCalcBolt(debugMode), 1).shuffleGrouping("history").shuffleGrouping("needHistory",
				"no_history_stream");                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    
		//
		builder.setBolt("flatten", new FlattenBolt(), 1).shuffleGrouping("calc", "flatten_stream");
		builder.setBolt("json", new JsonBolt(), 1).shuffleGrouping("flatten").shuffleGrouping("calc", "nested_stream");
		// builder.setBolt("count", new CounterBolt()).shuffleGrouping("fa");
		// builder.setBolt("file", new FlatFileBolt("/tmp/storm_output"),
		// 1).shuffleGrouping("json");
		builder.setBolt("es-bolt", new EsBolt(indexName, esConf, esFlushInterval), 1).shuffleGrouping("json", "flatten_stream");
//		builder.setBolt("file", new FlatFileBolt("/tmp/"), 1).shuffleGrouping("json", "nested_stream");
		builder.setBolt("mongo", new MongoInsertBolt(mongoUrl, mongoCollectinoName, mongoConf, new JsonMongoMapper())).shuffleGrouping("json", "nested_stream");
		// .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
		// 2).shuffleGrouping("fa");
		// EsBolt es = new EsBolt();
		// builder.setBolt("print", new PrinterBolt()).shuffleGrouping("words");
		return builder.createTopology();
	}

	public static void main(String[] args) throws Exception {

		// Initialize outside configuration

		String kafkaZk = String.format("%s:%s", xmlconfig.getString("kafka.zookeeper.host"),
				xmlconfig.getInt("kafka.zookeeper.port"));

		String nimbusHost = xmlconfig.getString("storm.nimbus.host");
		int nimbusPort = xmlconfig.getInt("storm.nimbus.port");
		String[] zookeeperHosts = xmlconfig.getStringArray("storm.zookeeper.host");
		int zookeeperPort = xmlconfig.getInt("storm.zookeeper.port");

		// String kafkaZk = args[0];
		// String kafkaZk = "localhost:2181";
		LogProcessTopology logProcessTopology = new LogProcessTopology(kafkaZk);
		Config config = new Config();
		config.setDebug(false);
		config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 200000);
		// config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 2000);

		StormTopology stormTopology = logProcessTopology.buildTopology();
		if (args != null && args.length > 0) {
			System.out.println("in the cluster mode...");
			System.out.println(args[0]);
			String name = args[0];
			// String dockerIp = args[2];
//			String dockerIp = "localhost";
			config.setNumWorkers(1);
			// config.setMaxTaskParallelism(5);
			config.put(Config.NIMBUS_HOST, nimbusHost);
			config.put(Config.NIMBUS_THRIFT_PORT, nimbusPort);
			config.put(Config.STORM_ZOOKEEPER_PORT, zookeeperPort);
			config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zookeeperHosts));
			StormSubmitter.submitTopology(name, config, stormTopology);
		} else {
			System.out.println("in the local mode...");
			config.setNumWorkers(1);
//			config.setMaxTaskParallelism(5);
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("kafka", config, stormTopology);

			Thread.sleep(1000 * 3600);

			cluster.shutdown();
		}
	}
}
